package com.kaigejava.flink.chapter01;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


/**
 * @author 凯哥Java
 * @description 有界流的-统计数量(基于datastream的api) 有界流处理
 * 和批量处理不同的地方：
 * a:创建执行环境的不同，流处理程序使用的是StreamExecutionEnvironment。批量使用的是：ExecutionEnvironment
 * b:每一步处理转换之后，得到的数据对象类型不同
 * 流：SingleOutputStreamOperator<Tuple2<String, Long>>  批量： FlatMapOperator<String, Tuple2<String,Long>>
 * c:分组操作低啊用的方法不同：流keyBy,参数是匿名函数作为key的选择器(keySelector),指定当前分组的key是什么
 * d:代码需要调用env的execute方法，开始执行任务
 * 执行结果：
 * 3> (hello,1)
 * 7> (flink,1)
 * 3> (hello,2)
 * 2> (java,1)
 * 1> (凯哥,1)
 * 7> (Java,1)
 * 3> (hello,3)
 * 5> (world,1)
 * 2> (java,2)
 * 6> (kaige,1)
 * 执行结果和我们文本的数据不一致，因为是多线程执行的
 * 这个前面的数字，是哪个线程执行的。任务槽，多线程执行的。并行度，当前任务分成多少分执行的。当前电脑是8核的，所以，最大是7
 * 注意：虽然每个并行子任务不同，但是我们观察java和hello可以发现，在同一个任务下，前面数字是相同的
 *
 * @company
 * @since 2022/11/21 16:03
 */
public class BoundedStreamWordCount {

    public static void main(String[] args) throws Exception {
        //1：创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2:读取文件
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input\\words.txt");
        //3: 进行转换
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDataStreamSource
                .flatMap((String line, Collector<Tuple2<String,Long>> out)-> {
                    String [] words = line.split(" ");
                    //获取每一个words,转换成二元组
                    for (String word : words) {
                        out.collect(Tuple2.of(word,1L));
                    }

                }).returns(Types.TUPLE(Types.STRING,Types.LONG));
        //4:分组
         KeyedStream<Tuple2<String,Long>,String> wordAndOneKs =  wordAndOne.keyBy(t -> t.f0);
        //5：求和
        SingleOutputStreamOperator<Tuple2<String,Long>> result = wordAndOneKs.sum(1);
        //6:打印
        result.print();
        // 7: 启动==>执行
        env.execute();

    }
}
